{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Write custom deserializer\n",
    " \n",
    "In this manual we describe how to load data using CNTK custom deserializers. CNTK also provides other means for loading data (i.e. built-in deserializers, user defined minibatch sources or feeding NumPy data explicitly), for more details please have a look at the [How to feed data](Manual_How_to_feed_data.ipynb) manual.\n",
    "\n",
    "So, why use custom deserializers in the first place? The main reason is that you have some custom format that CNTK does not support out of the box and you would like to load your data efficiently.\n",
    "\n",
    "Custom deserializers give the user the following advantages:\n",
    "1. Flexibility: the user can deserialize any format she wants.\n",
    "1. Simplicity: writing a new deserializer is relatively easy. \n",
    "1. Efficiency: the data will be prefetched automatically (when possible) and moved to GPU memory on a separate thread. In conjunction with Function.train API this allows the main Python thread to concentrate fully on deserialization and IO prefetching.\n",
    "1. Randomization: each new sweep the data will be randomized.\n",
    "1. Checkpointing: checkpoints are supported out of the box.\n",
    "1. Distribution: CNTK will know how to distribute chunks to different workers in a distributed environment.\n",
    "\n",
    "**_Please note, that due to CPython limitations only a single thread is allowed to interpret Python script at any point in time, so if you perform some heavy CPU work during deserialization this will still influence your performance because it cannot be effectively parallelized. We recommend using built-in deserializers for CPU hungry workloads._**\n",
    "\n",
    "We start with some imports we need for the rest of this manual: "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from __future__ import print_function\n",
    "from __future__ import division\n",
    "import os\n",
    "import sys\n",
    "import io\n",
    "import cntk\n",
    "import cntk.ops\n",
    "import cntk.io\n",
    "import cntk.train\n",
    "import pandas as pd\n",
    "import numpy as np\n",
    "import random\n",
    "import math\n",
    "from scipy import sparse as sp\n",
    "import cntk.tests.test_utils\n",
    "from cntk.io import MinibatchSource\n",
    "\n",
    "cntk.tests.test_utils.set_device_from_pytest_env() # (only needed for our build system)\n",
    "cntk.cntk_py.set_fixed_random_seed(1) # fix the random seed so that LR examples are repeatable"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 1. Main concepts\n",
    "\n",
    "The main responsibility of a deserializer is to take serialized data from an external storage and create an in-memory representation of the sequence (or sample) that can be consumed by the network.\n",
    "The interface of the deserializer is simple and consists of three main methods:\n",
    " - __stream_infos__: similarly to built-in deserializers, this function returns a list of streams this deserializer will provide. Each stream is described by its name, data type, format and shape\n",
    " - __num_chunks__: returns the number of data chunks.To make IO efficient the deserializer does not operate on a single sequence, instead it operates in chunks. A chunk is just a set of sequences that can be read in an efficient manner (for example, in case of CSV file on disk, it makes sense to read 32 or 64MBs in one go).\n",
    " - __get_chunk(chunk_id)__: given a chunk identifier (0 <= __chunk_id__ < __num_chunks__) the deserializer should return an array or a CSR matrix of samples/sequences.\n",
    "\n",
    "Let's implement a simple custom deserializer that will hold all its data in memory as a single chunk:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from cntk.io import UserDeserializer\n",
    "\n",
    "# This class will take a simple dictionary of {name => (data)} in the constructor\n",
    "# and will expose a single chunk.\n",
    "class FromData(UserDeserializer):\n",
    "    def __init__(self, data_streams):\n",
    "        super(FromData, self).__init__()\n",
    "        if not data_streams:\n",
    "            raise(ValueError('at least one stream must be specified, in the form name=data'))\n",
    "\n",
    "        self._data = data_streams   # [name] -> numpy.array or scipy.sparse.csr_matrix or list of those\n",
    "        self._streams = []          # meta information about exposed stream\n",
    "        num_sequences = -1          # total number of sequences (can be of length 1 in sample mode)\n",
    "                                    # must be the same across all streams\n",
    "\n",
    "        # Infer the meta information about streams\n",
    "        for name, value in data_streams.items():\n",
    "            is_sequence = isinstance(value, list) # is list - single elements are considered sequences\n",
    "\n",
    "            # Infer sparsity\n",
    "            element = value[0] if is_sequence else value\n",
    "            if isinstance(element, np.ndarray):\n",
    "                is_sparse = False\n",
    "            elif isinstance(element, sp.csr_matrix):\n",
    "                is_sparse = True\n",
    "            else:\n",
    "                raise TypeError('data must be a numpy.array or scipy.sparse.csr_matrix, or a list of those')\n",
    "\n",
    "            # Infer sample shape\n",
    "            sample_shape = value[0].shape[1:] if is_sequence else value.shape[1:]\n",
    "\n",
    "            # Check that the number of sequences across all streams is the same\n",
    "            stream_num_sequences = len(value) if is_sequence else value.shape[0]            \n",
    "            if num_sequences == -1:\n",
    "                if stream_num_sequences == 0:\n",
    "                    raise(ValueError('data is empty'))\n",
    "                num_sequences = stream_num_sequences\n",
    "            elif stream_num_sequences != num_sequences:\n",
    "                raise ValueError('all data items must have the same first dimension')\n",
    "\n",
    "            self._streams.append(dict(name = name, shape = sample_shape, is_sparse = is_sparse))\n",
    "    \n",
    "    # Return meta information about streams\n",
    "    def stream_infos(self):\n",
    "        return [cntk.io.StreamInformation(stream['name'], index, ['dense', 'sparse'][stream['is_sparse']], \n",
    "                                          np.float32, stream['shape'])\n",
    "                for index, stream in enumerate(self._streams)]\n",
    "\n",
    "    # We have a single chunk only  \n",
    "    def num_chunks(self):\n",
    "        return 1\n",
    "\n",
    "    # actually return out chunk data as a dictionary name => data\n",
    "    # where the data is a list of sequences or a csr_matrix/ndarray of samples\n",
    "    def get_chunk(self, chunk_id):\n",
    "        if chunk_id != 0:\n",
    "            raise ValueError(\"Unexpected chunk id\")\n",
    "        return self._data\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As can be seen above the main work is done in the constructor, where given the data we infer the information about the exposed streams. The implementation of __get_chunk__ and __num_chunk__ is degenerate for this case because we have a single chunk only. \n",
    "\n",
    "The chunk is a dictionary that as keys contains the names of the streams and as values either a list of sequences or a NumPy array/CSR matrix (in sample mode when all sequences are of length 1).\n",
    "\n",
    "Now given the defined above deserializer we can simply create a minibatch source with or without randomization:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Non randomized\n",
      "Sparse\n",
      "[[[ 1.  0.  0.]]\n",
      "\n",
      " [[ 0.  2.  0.]]\n",
      "\n",
      " [[ 0.  0.  3.]]]\n",
      "Dense\n",
      "[[[ 0.  1.  2.]]\n",
      "\n",
      " [[ 3.  4.  5.]]\n",
      "\n",
      " [[ 6.  7.  8.]]]\n",
      "Randomized\n",
      "Sparse\n",
      "[[[ 0.  5.  0.]]\n",
      "\n",
      " [[ 1.  0.  0.]]\n",
      "\n",
      " [[ 4.  0.  0.]]]\n",
      "Dense\n",
      "[[[ 12.  13.  14.]]\n",
      "\n",
      " [[  0.   1.   2.]]\n",
      "\n",
      " [[  9.  10.  11.]]]\n"
     ]
    }
   ],
   "source": [
    "# Dense and sparse samples non randomized\n",
    "print('Non randomized')\n",
    "N = 5\n",
    "X = np.arange(3*N).reshape(N,3).astype(np.float32) # 5 rows of 3 values\n",
    "Y = sp.csr_matrix(np.array([[1, 0, 0], \n",
    "                            [0, 2, 0], \n",
    "                            [0, 0, 3], \n",
    "                            [4, 0, 0], \n",
    "                            [0, 5, 0]], dtype=np.float32))\n",
    "\n",
    "mbs = MinibatchSource([FromData(dict(x=X, y=Y))], randomize=False)\n",
    "mb = mbs.next_minibatch(3)\n",
    "result = mb[mbs.streams['y']].data.asarray()\n",
    "assert (result == np.array([[[ 1, 0, 0]],\n",
    "                            [[ 0, 2, 0]],\n",
    "                            [[ 0, 0, 3]]], dtype=np.float32)).all()\n",
    "print('Sparse')\n",
    "print(result)\n",
    "\n",
    "result = mb[mbs.streams['x']].data.asarray()\n",
    "assert (result == np.array([[[ 0.,  1.,  2.]],\n",
    "                            [[ 3.,  4.,  5.]],\n",
    "                            [[ 6.,  7.,  8.]]],dtype=np.float32)).all()\n",
    "print('Dense')\n",
    "print(result)\n",
    "\n",
    "print('Randomized')\n",
    "mbs1 = MinibatchSource([FromData(dict(x=X, y=Y))], randomize=True)\n",
    "mb1 = mbs1.next_minibatch(3)\n",
    "print('Sparse')\n",
    "print(mb1[mbs1.streams['y']].data.asarray())\n",
    "print('Dense')\n",
    "print(mb1[mbs1.streams['x']].data.asarray())\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2. Processing big files\n",
    "\n",
    "The sample above though simple was only useful for data that can fit in memory. Let's see how we can implement a deserializer that would allow us to ingest data that exceeds our memory.\n",
    "\n",
    "Let's generate a CSV file with 200 thousands lines (you can use adjust the number of rows as you see fit). Each line will have 150 features and a single label (151 columns in total):\n",
    "\n",
    "|x1|..|x150|y|\n",
    "|:-|:-|:-|:-|\n",
    "|0|0|0|0|\n",
    "|1|1|1|0|\n",
    "|...|...|...|...|\n",
    "|199999|199999|...|199999|\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "0 records generated\n",
      "20000 records generated\n",
      "40000 records generated\n",
      "60000 records generated\n",
      "80000 records generated\n",
      "100000 records generated\n",
      "120000 records generated\n",
      "140000 records generated\n",
      "160000 records generated\n",
      "180000 records generated\n",
      "Input file is generated\n"
     ]
    }
   ],
   "source": [
    "import csv\n",
    "filename = 'big_file.tmp'\n",
    "\n",
    "with open(filename, 'w') as data:\n",
    "    w = csv.writer(data, quoting=csv.QUOTE_ALL)\n",
    "    for i in range(200000):\n",
    "        w.writerow([float(i) for j in range(151)])\n",
    "        if i % 20000 == 0:\n",
    "            print('%d records generated' % i)\n",
    "print(\"Input file is generated\")\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In order to consume this file, let's write a CSV deserializer that will cut the given file in chunks of the specified size and  parse a particular chunk using __pandas__ module:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "class CSVDeserializer(UserDeserializer):\n",
    "    def __init__(self, filename, streams, chunksize = 32*1024*1024):\n",
    "        super(CSVDeserializer, self).__init__()\n",
    "        self._chunksize = chunksize\n",
    "        self._filename = filename\n",
    "        \n",
    "        # Create the information about streams\n",
    "        # based on the user provided data\n",
    "        self._streams = [cntk.io.StreamInformation(s['name'], i, 'dense', np.float32, s['shape'])\n",
    "                         for i, s in enumerate(streams)]\n",
    "        \n",
    "        # Define the number of chunks based on the file size\n",
    "        self._num_chunks = int(math.ceil(os.stat(filename).st_size/chunksize))\n",
    "        \n",
    "        # Based on the information provided by the user decide what column span\n",
    "        # belongs to which stream\n",
    "        self._offsets = [0]\n",
    "        for i, s in enumerate(self._streams):\n",
    "            self._offsets.append(s.sample_shape[0] + self._offsets[-1])\n",
    "\n",
    "    def stream_infos(self):\n",
    "        return self._streams\n",
    "\n",
    "    def num_chunks(self):\n",
    "        return self._num_chunks\n",
    "\n",
    "    # Ok, let's actually get the work done\n",
    "    def get_chunk(self, chunk_id):\n",
    "        fin = open(self._filename, \"rb\")\n",
    "        \n",
    "        # Some constants\n",
    "        endline = '\\n' if sys.version_info < (3,) else ord('\\n')\n",
    "        _64KB = 64 * 1024;\n",
    "        \n",
    "        # We would like to cut our chunk exactly on the line boundary.\n",
    "        # So let's make sure if the chunk starts in the middle\n",
    "        # of a row we move left to the beginning of this row\n",
    "        offset = chunk_id * self._chunksize\n",
    "        if offset != 0: # Need to find the beginning of the current row\n",
    "            while offset > 0:\n",
    "                offset -= _64KB # move left 64 KB\n",
    "                fin.seek(offset)\n",
    "                buf = fin.read(_64KB) # read the data\n",
    "                index = buf.rindex(endline) # find the last \\n and adapt the chunk offset\n",
    "                if index != -1: # Found, breaking\n",
    "                    offset += index\n",
    "                    break\n",
    "            if offset == 0:\n",
    "                raise ValueError('A single row does not fit into the chunk, consider increasing the chunk size')\n",
    "\n",
    "        # Now read the chunk data with adapted offset\n",
    "        fin.seek(offset)\n",
    "        size = (chunk_id + 1) * self._chunksize - offset\n",
    "        data = fin.read(size)\n",
    "        last_endline = data.rindex(endline) # Make sure we drop the last partial line\n",
    "                                            # It will be consumed by the next chunk\n",
    "        if last_endline == -1:\n",
    "            raise ValueError('A single row does not fit into the chunk, consider increasing the chunk size')\n",
    "        data = data[:last_endline + 1]\n",
    "        \n",
    "        # Parse the csv using pandas\n",
    "        df = pd.read_csv(io.BytesIO(data), engine='c', dtype=np.float32, header=None)\n",
    "\n",
    "        # Create a dictionary {name => data}, \n",
    "        # where data spans the number of columns specified by the user\n",
    "        result = {}\n",
    "        mat = df.as_matrix()\n",
    "        for i, stream in enumerate(self._streams):\n",
    "            result[stream.m_name] = np.ascontiguousarray(mat[:, self._offsets[i]:self._offsets[i + 1]])\n",
    "        return result\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now let's read through the data using the corresponding minibatch source:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "...............\n",
      "Total number of samples 200000, speed 23691.068044 samples per second\n"
     ]
    }
   ],
   "source": [
    "import time\n",
    "\n",
    "d = CSVDeserializer(filename=filename, streams=[dict(name='x', shape=(150,)), dict(name='y', shape=(1,))])\n",
    "mbs = MinibatchSource([d], randomize=False, max_sweeps=1)\n",
    "\n",
    "total_num_samples = 0\n",
    "start = time.time()\n",
    "while True:\n",
    "    mb = mbs.next_minibatch(128)\n",
    "    if not mb:\n",
    "        break\n",
    "    total_num_samples += mb[mbs.streams.x].number_of_samples\n",
    "    if total_num_samples % 12800 == 0:\n",
    "        sys.stdout.write('.')\n",
    "end = time.time()\n",
    "print()\n",
    "print('Total number of samples %d, speed %f samples per second' % (total_num_samples, total_num_samples/(end-start)))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3. Randomization\n",
    "\n",
    "The randomization is enabled simply by instantiating the [MinibatchSource](https://cntk.ai/pythondocs/cntk.io.html?#cntk.io.MinibatchSource) with _randomize=True_. Randomization happens each sweep and is two-fold: firstly all chunks are randomized, then the window of chunks is created (controlled using `randomization_window_in_chunks` or `randomization_window_in_samples` parameters) and all sequences inside the window are randomized in their own turn.\n",
    "\n",
    "## 4. Checkpointing\n",
    "\n",
    "Checkpointing is done transparently for the deserializer. You can use [get_checkpoint_state](https://cntk.ai/pythondocs/_modules/cntk/io.html#MinibatchSource.get_checkpoint_state) and [restore_from_checkpoint](https://cntk.ai/pythondocs/_modules/cntk/io.html#MinibatchSource.restore_from_checkpoint) on the [MinibatchSource](https://cntk.ai/pythondocs/cntk.io.html?#cntk.io.MinibatchSource).\n",
    "\n",
    "## 5. Distribution\n",
    "\n",
    "Distribution is done transparently for the deserializer. In case of randomization, the distribution is based on the chunk id, in non randomized mode - on the sequence position in the sweep.\n",
    "\n",
    "## 6. Threading\n",
    "\n",
    "__get_chunk__ is executed on the prefetch thread.\n",
    "Please be aware that [real multithreading is not possible in CPython](https://docs.python.org/3.6/library/threading.html): \n",
    "> _In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing or concurrent.futures.ProcessPoolExecutor. However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously._\n",
    "\n"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
